Reactor 模式的理解

Posted by 张伟真 on 2024-08-08
Estimated Reading Time 6 Minutes
Words 1.5k In Total
Viewed Times

理解 Reactor 模式的 3 个重要概念:Publisher、Subscriber和Subscription

在Reactor中,有三个重要的概念:Publisher、Subscriber和Subscription。

  1. Publisher:Publisher是一个接口,在Reactor中用于表示数据源。它可以产生数据流,将数据推送给订阅者。Publisher接口定义了subscribe()方法,用于订阅并连接Subscriber和数据源。

  2. Subscriber:Subscriber是一个接口,用于接收数据流并处理数据。Subscriber通过实现onSubscribe()、onNext()、onError()和onComplete()等方法来定义对数据流的处理逻辑。当Subscriber通过subscribe()方法订阅Publisher时,它将与Publisher建立连接,并开始接收数据流。

  3. Subscription:Subscription是一个接口,用于表示订阅关系。当Subscriber订阅Publisher时,Publisher将创建一个Subscription实例。Subscription接口定义了request()和cancel()方法,用于请求更多的数据或取消订阅。

这三个概念共同构成了Reactor中的反应式流处理模型。Publisher负责产生数据流,Subscriber负责接收和处理数据流,而Subscription则管理Subscriber和Publisher之间的订阅关系。通过这种模型,Reactor提供了一种非阻塞、异步的编程模式,使得处理数据流变得简单、灵活和高效。

下面是一个简单的示例代码,演示了如何实现Publisher、Subscriber和Subscription接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class ReactorDemo {
public static void main(String[] args) {
// 创建一个简单的Publisher
Publisher<String> publisher = new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> subscriber) {
// 创建一个新的Subscription
Subscription subscription = new Subscription() {
// 使用计数器来模拟数据流
private int counter = 0;
private boolean terminated = false;

@Override
public void request(long n) {
// 请求n个数据
for (int i = 0; i < n; i++) {
// 发送数据到Subscriber
if (!terminated) {
String data = "Data " + (++counter);
subscriber.onNext(data);
}
}
// 如果已终止,则发送完成信号
if (terminated) {
subscriber.onComplete();
}
}

@Override
public void cancel() {
// 取消订阅
terminated = true;
}
};

// 通知Subscriber进行订阅
subscriber.onSubscribe(subscription);
}
};

// 创建一个简单的Subscriber
Subscriber<String> subscriber = new Subscriber<String>() {
private Subscription subscription;
private int receivedCounter = 0;

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
// 请求一个数据
subscription.request(1);
}

@Override
public void onNext(String s) {
// 处理接收到的数据
System.out.println("Received: " + s);
receivedCounter++;
// 请求下一个数据
subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
// 处理错误
throwable.printStackTrace();
}

@Override
public void onComplete() {
// 处理完成信号
System.out.println("Received " + receivedCounter + " data");
}
};

// 订阅Publisher
publisher.subscribe(subscriber);
}
}

在这个示例中,我们创建了一个简单的Publisher实现,它通过计数器模拟产生数据流,并将数据推送给Subscriber。Subscriber实现中,我们在onSubscribe()方法中请求一个数据,并在onNext()方法中处理接收到的数据,然后再请求下一个数据,直到数据流结束。当数据流完成后,我们在onComplete()方法中处理完成信号。

通过实现这三个接口,我们可以方便地创建和控制反应式数据流,实现自定义的数据处理逻辑。在实际应用中,我们可以使用Reactor提供的各种操作符和工具类来构建更复杂的流处理逻辑。

===============================

个人理解:

Publisher 只定义了 subscribe() 方法,他是用来为 Subscriber 建立订阅会话用的(Subscription 就是会话),就类似 kafka 的消费组的概念,也可以比喻成我们跑去消息中心建了一条光缆。

从这个定义我们可以推理出:

  1. Publisher 怎么生产数据是不受约束的。
  2. Subscription 保存在哪里没约束。
  3. Subscriber 保存在哪也没有约束。
  4. Publisher 怎么把生产消息推送给 Subscription 也没约束。
  5. Publisher 消息的类型也没约束。
  6. Publisher 是单例还是多例,也没规定。
  7. Publisher 没有约定已发送的消息就必须删除。

Subscriber 负责的核心方法是 onNext(), 这个方法可以看成是 消息消费者的一个接收消息的 API。
从这个定义我们可以推理出:

  1. 给对象最核心的方法是 onNext()。( 其他 onSubscribe onError onComplete 均可有可无)
  2. onNext() 没有定义重试和事务等机制, 因为作者假定了会话的处理是单调稳定的,又或者说作者认为事务由 Subscriber 去保障(Publisher 只把话说一次,能悟不能悟就看 Subscriber 造化 )。
  3. 他的 onSubscribe onError onComplete 看上去是围绕 Subscriber 的生命周期设计的,其实不是,他围绕的是 Subscription 的生命周期。
  4. Subscriber 没有约束 onNext() 被调用的时机(由谁触发)。
  5. Subscriber 没有约束 onNext() 的调用方式(在哪里调用)。
  6. Subscriber 没有约束 onNext() 处理什么业务,以及 怎么处理。
  7. onError 也没有约束 在哪里调用,所以几乎你能想到的地方都可以调用, 因为 Subscription 覆盖了所有上下文,他包括了 Publisher 与 Subscriber。

Subscription 只定义了 request() 方法,用来通知 Publisher 发消息给 Subscriber,以及定义了 cancel() 用来通知 Publisher 别发消息给 Subscriber。
从这个定义我们可以推理出:

  1. request() 没有约束什么时候调用、也没有约束由谁调用。
  2. request() 没有约束如何通知 Publisher。
  3. request() 没有约束 Publisher 如何发送消息,仅仅是一种建议。
  4. request() 没有约束 Publisher 从哪里开始发消息给 Subscriber。
  5. cancel() 也是一样,非常微不足道,以至于对 Publisher 的约束度很松散。

因此,Publisher 和 Subscription、Subscriber 约束如此的少,所以,他的实现类千奇百怪,学习起来很晦涩,但我们只要记住他是一种生产者消费者模式即可,只是多了一个 Subscription.request() 实现对消费者负载保护罢了(就好像 TCP 拥堵控制算法对网络的保护)。

常规实现:
1.利用装饰者模式嵌套调用。
2.subscribe() 中的 onSubscribe() 执行首次 request()
3.在 request() 中执行消息发送给 Subscriber 的动作,onNext()
4.在 onNext() 中执行 hook 操作。
5.如果由于是装饰者模式,所以在执行完当前的 onNext() 后,还需要执行hook链上其他 Subscriber 的onNext() 消息。


如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !